---
id: streaming-updates
title: Streaming Updates
sidebar_label: Streaming Updates
hide_title: true
description: 'RTK Query > Usage > Streaming Updates: updating cache from pushed messages'
---

&nbsp;

# Streaming Updates

## Overview

RTK Query gives you the ability to receive **streaming updates** for persistent queries. This enables a query to establish an ongoing connection to the server (typically using WebSockets), and apply updates to the cached data as additional information is received from the server.

Streaming updates can be used to enable the API to receive real-time updates to the back-end data, such as new entries being created, or important properties being updated.

To enable streaming updates for a query, pass the asynchronous `onCacheEntryAdded` function to the query, including the logic for how to update the query when streamed data is received. See [`onCacheEntryAdded` API reference](../api/createApi#oncacheentryadded) for more details.

## When to use streaming updates

Primarily updates to query data should be done via [`polling`](./polling) intermittently on an interval, using [`cache invalidation`](./automated-refetching#advanced-invalidation-with-abstract-tag-ids) to invalidate data based on tags associated with queries & mutations, or with [`refetchOnMountOrArgChange`](../api/createApi#refetchonmountorargchange) to fetch fresh data when a component using the data mounts.

However, streaming updates is particularly useful for scenarios involving:

- _Small, frequent changes to large objects_. Rather than repeatedly polling for a large object, the object can be fetched with an initial query, and streaming updates can update individual properties as updates are received.
- _External event-driven updates_. Where data may be changed by the server or otherwise external users and where real-time updates are expected to be shown to an active user, polling alone would result in periods of stale data in between queries, causing state to easily get out of sync. Streaming updates can update all active clients as the updates occur rather than waiting for the next interval to elapse.

Example use cases that benefit from streaming updates are:

- GraphQL subscriptions
- Real-time chat applications
- Real-time multiplayer games
- Collaborative document editing with multiple concurrent users

## Using the `onCacheEntryAdded` Lifecycle

The `onCacheEntryAdded` lifecycle callback lets you write arbitrary async logic that will be executed after a new cache entry is added to the RTK Query cache (ie, after a component has created a new subscription to a given endpoint+params combination).

`onCacheEntryAdded` will be called with two arguments: the `arg` that was passed to the subscription, and an options object containing "lifecycle promises" and utility functions. You can use these to write sequenced logic that waits for data to be added, initiates server connections, applies partial updates, and cleans up the connection when the query subscription is removed.

Typically, you will `await cacheDataLoaded` to determine when the first data has been fetched, then use the `updateCacheData` utility to apply streaming updates as messages are received. `updateCacheData` is an Immer-powered callback that receives a `draft` of the current cache value. You may "mutate" the draft value to update it as needed based on the received values. RTK Query will then dispatch an action that applies a diffed patch based on those changes.

Finally, you can `await cacheEntryRemoved` to know when to clean up any server connections.

## Streaming Update Examples

### Websocket Chat API

```ts
// file: schemaValidators.ts noEmit
import type { Message } from './api'

export function isMessage(message: unknown): message is Message {
  // in real code this would check `message` to ensure it is a `Message`
  return true
}

// file: api.ts
import { createApi, fetchBaseQuery } from '@reduxjs/toolkit/query/react'
import { isMessage } from './schemaValidators'

export type Channel = 'redux' | 'general'

export interface Message {
  id: number
  channel: Channel
  userName: string
  text: string
}

export const api = createApi({
  baseQuery: fetchBaseQuery({ baseUrl: '/' }),
  endpoints: (build) => ({
    getMessages: build.query<Message[], Channel>({
      query: (channel) => `messages/${channel}`,
      // highlight-start
      async onCacheEntryAdded(
        arg,
        { updateCachedData, cacheDataLoaded, cacheEntryRemoved },
      ) {
        // create a websocket connection when the cache subscription starts
        const ws = new WebSocket('ws://localhost:8080')
        try {
          // wait for the initial query to resolve before proceeding
          await cacheDataLoaded

          // when data is received from the socket connection to the server,
          // if it is a message and for the appropriate channel,
          // update our query result with the received message
          const listener = (event: MessageEvent) => {
            const data = JSON.parse(event.data)
            if (!isMessage(data) || data.channel !== arg) return

            updateCachedData((draft) => {
              draft.push(data)
            })
          }

          ws.addEventListener('message', listener)
        } catch {
          // no-op in case `cacheEntryRemoved` resolves before `cacheDataLoaded`,
          // in which case `cacheDataLoaded` will throw
        }
        // cacheEntryRemoved will resolve when the cache subscription is no longer active
        await cacheEntryRemoved
        // perform cleanup steps once the `cacheEntryRemoved` promise resolves
        ws.close()
      },
      // highlight-end
    }),
  }),
})

export const { useGetMessagesQuery } = api
```

#### What to expect

When the `getMessages` query is triggered (e.g. via a component mounting with the `useGetMessagesQuery()` hook), a `cache entry` will be added based on the serialized arguments for the endpoint. The associated query will be fired off based on the `query` property to fetch the initial data for the cache. Meanwhile, the asynchronous `onCacheEntryAdded` callback will begin, and create a new WebSocket connection. Once the response for the initial query is received, the cache will be populated with the response data, and the `cacheDataLoaded` promise will resolve. After awaiting the `cacheDataLoaded` promise, the `message` event listener will be added to the WebSocket connection, which updates the cache data when an associated message is received.

When there are no more active subscriptions to the data (e.g. when the subscribed components remain unmounted for a sufficient amount of time), the `cacheEntryRemoved` promise will resolve, allowing the remaining code to run and close the websocket connection. RTK Query will also remove the associated data from the cache.

If a query for the corresponding cache entry runs later, it will overwrite the whole cache entry, and the streaming update listeners will continue to work on the updated data.

### Websocket Chat API with a transformed response shape

```ts
// file: schemaValidators.ts noEmit
import type { Message } from './api'

export function isMessage(message: unknown): message is Message {
  // in real code this would check `message` to ensure it is a `Message`

  return true
}

// file: api.ts
import { createApi, fetchBaseQuery } from '@reduxjs/toolkit/query/react'
import { createEntityAdapter } from '@reduxjs/toolkit'
import type { EntityState } from '@reduxjs/toolkit'
import { isMessage } from './schemaValidators'

export type Channel = 'redux' | 'general'

export interface Message {
  id: number
  channel: Channel
  userName: string
  text: string
}

// highlight-start
const messagesAdapter = createEntityAdapter<Message>()
// highlight-end
export const api = createApi({
  baseQuery: fetchBaseQuery({ baseUrl: '/' }),
  endpoints: (build) => ({
    // highlight-start
    getMessages: build.query<EntityState<Message, number>, Channel>({
      // highlight-end
      query: (channel) => `messages/${channel}`,
      // highlight-start
      transformResponse(response: Message[]) {
        return messagesAdapter.addMany(
          messagesAdapter.getInitialState(),
          response,
        )
      },
      // highlight-end
      async onCacheEntryAdded(
        arg,
        { updateCachedData, cacheDataLoaded, cacheEntryRemoved },
      ) {
        const ws = new WebSocket('ws://localhost:8080')
        try {
          await cacheDataLoaded

          const listener = (event: MessageEvent) => {
            const data = JSON.parse(event.data)
            if (!isMessage(data) || data.channel !== arg) return

            updateCachedData((draft) => {
              // highlight-start
              messagesAdapter.upsertOne(draft, data)
              // highlight-end
            })
          }

          ws.addEventListener('message', listener)
        } catch {}
        await cacheEntryRemoved
        ws.close()
      },
    }),
  }),
})

export const { useGetMessagesQuery } = api
```

This example demonstrates how the [previous example](#websocket-chat-api) can be altered to allow for transforming the response shape when adding data to the cache.

For example, the data is transformed from this shape:

```ts no-transpile
[
  {
    id: 0
    channel: 'redux'
    userName: 'Mark'
    text: 'Welcome to #redux!'
  },
  {
    id: 1
    channel: 'redux'
    userName: 'Lenz'
    text: 'Glad to be here!'
  },
]
```

To this:

```ts no-transpile
{
  // The unique IDs of each item. Must be strings or numbers
  ids: [0, 1],
  // A lookup table mapping entity IDs to the corresponding entity objects
  entities: {
    0: {
      id: 0,
      channel: "redux",
      userName: "Mark",
      text: "Welcome to #redux!",
    },
    1: {
      id: 1,
      channel: "redux",
      userName: "Lenz",
      text: "Glad to be here!",
    },
  },
};
```

A key point to keep in mind is that updates to the cached data within the `onCacheEntryAdded` callback must respect the transformed data shape which will be present for the cached data. The example shows how [`createEntityAdapter`](../../api/createEntityAdapter) can be used for the initial `transformResponse`, and again when streamed updates are received to upsert received items into the cached data, while maintaining the normalized state structure.
